package drds.data_propagate.parse;

import drds.data_propagate.binlog_event.BinLogEvent;
import drds.data_propagate.common.utils.JsonUtils;
import drds.data_propagate.driver.packets.server.ResultSetPacket;
import drds.data_propagate.entry.Entry;
import drds.data_propagate.entry.EntryType;
import drds.data_propagate.entry.position.BinLogEventPosition;
import drds.data_propagate.entry.position.EntryPosition;
import drds.data_propagate.parse.exception.ParseException;
import drds.data_propagate.parse.ha.HaController;
import drds.data_propagate.parse.table_meta_data.DatabaseTableMetaDataStore;
import drds.data_propagate.parse.table_meta_data.TableMetaDataStoreManager;
import lombok.Getter;
import lombok.Setter;
import org.apache.commons.lang.StringUtils;
import org.springframework.util.CollectionUtils;

import java.io.IOException;
import java.net.InetAddress;
import java.net.UnknownHostException;
import java.util.List;
import java.util.TimerTask;

/**
 * 基于向mysql server复制binlog实现
 *
 * <pre>
 * 1. 自身不控制mysql主备切换，由ha机制来控制. 比如接入tddl/cobar/自身心跳包成功率
 * 2. 切换机制
 * </pre>
 */
public class MysqlEventParser extends AbstractMysqlEventParser implements EventParser, HaSwitchable {

    @Setter
    @Getter
    public HaController haController = null;
    // 心跳检查信息
    @Setter
    @Getter
    public String heartBeatSql; // 心跳sql
    // 数据库信息
    @Setter
    @Getter
    protected AuthenticationInfo masterAuthenticationInfo; // 主库
    @Setter
    @Getter
    protected AuthenticationInfo standbyAuthenticationInfo; // 备库
    @Setter
    @Getter
    // binlog信息
    protected EntryPosition masterEntryPosition;
    @Setter
    @Getter
    protected EntryPosition standbyEntryPosition;
    @Setter
    @Getter
    private int defaultConnectionTimeoutInSeconds = 30; // sotimeout
    @Setter
    @Getter
    private int receiveBufferSize = 64 * 1024;
    @Setter
    @Getter
    private int sendBufferSize = 64 * 1024;
    @Setter
    @Getter
    private long slaveId; // 链接到mysql的slave
    @Setter
    @Getter
    private DumperImpl dumper; // 查询meta信息的链接
    @Setter
    @Getter
    private TableMetaDataStoreManager tableMetaDataStoreManager; // 对应meta
    @Setter
    @Getter
    private int fallbackIntervalInSeconds = 60; // 切换回退时间


    // update by yishun.chen,特殊异常处理参数
    @Setter
    @Getter
    private int dumpErrorCount = 0; // binlogDump失败异常计数
    @Setter
    @Getter
    private int dumpErrorCountThreshold = 2; // binlogDump失败异常计数阀值
    @Setter
    @Getter
    private boolean rdsOssMode = false;

    protected Dumper buildDumper() {
        return buildDumper(this.runningAuthenticationInfo);
    }

    protected void preDump(Dumper dumper) {

        if (binLogEventParser != null && binLogEventParser instanceof BinlogEventConvertToEntry) {
            this.dumper = (DumperImpl) dumper.fork();
            try {
                this.dumper.connect();
            } catch (IOException e) {
                throw new ParseException(e);
            }
            if (tableMetaDataStore != null && tableMetaDataStore instanceof DatabaseTableMetaDataStore) {
                ((DatabaseTableMetaDataStore) tableMetaDataStore).setDumper(this.dumper);
                ((DatabaseTableMetaDataStore) tableMetaDataStore).setFilter(eventFilter);
                ((DatabaseTableMetaDataStore) tableMetaDataStore).setBlackFilter(eventBlackFilter);
                ((DatabaseTableMetaDataStore) tableMetaDataStore).setSnapshotInterval(tsdbSnapshotInterval);
                ((DatabaseTableMetaDataStore) tableMetaDataStore).setSnapshotExpire(tsdbSnapshotExpire);
                ((DatabaseTableMetaDataStore) tableMetaDataStore).init(taskId);
            }

            tableMetaDataStoreManager = new TableMetaDataStoreManager(this.dumper, tableMetaDataStore);
            ((BinlogEventConvertToEntry) binLogEventParser).setTableMetaDataStoreManager(tableMetaDataStoreManager);
        }
    }

    protected void afterDump(Dumper dumper) {
        super.afterDump(dumper);

        if (dumper == null) {
            throw new ParseException("illegal dumper is null");
        }

        if (!(dumper instanceof DumperImpl)) {
            throw new ParseException("Unsupported dumper eventType : " + dumper.getClass().getSimpleName());
        }

        if (this.dumper != null) {
            try {
                this.dumper.disconnect();
            } catch (IOException e) {
                log.error("ERROR # disconnect meta dumper for address:{}",
                        this.dumper.getConnector().getInetSocketAddress(), e);
            }
        }
    }

    public void start() throws ParseException {
        if (runningAuthenticationInfo == null) { // 第一次链接主库
            runningAuthenticationInfo = masterAuthenticationInfo;
        }

        super.start();
    }

    public void stop() throws ParseException {
        if (dumper != null) {
            try {
                dumper.disconnect();
            } catch (IOException e) {
                log.error("ERROR # disconnect meta dumper for address:{}",
                        dumper.getConnector().getInetSocketAddress(), e);
            }
        }

        if (tableMetaDataStoreManager != null) {
            tableMetaDataStoreManager.clearTableMetaData();
        }

        super.stop();
    }

    protected TimerTask buildHeartBeatTimeTask(Dumper dumper) {
        if (!(dumper instanceof DumperImpl)) {
            throw new ParseException("Unsupported dumper eventType : " + dumper.getClass().getSimpleName());
        }

        // 开始mysql心跳sql
        if (heartBeatEnable && StringUtils.isNotBlank(heartBeatSql)) {
            return new HeartBeatTimeTask(this, (DumperImpl) dumper.fork());
        } else {
            return super.buildHeartBeatTimeTask(dumper);
        }

    }

    protected void stopHeartBeat() {
        TimerTask heartBeatTimerTask = this.heartBeatTimerTask;
        super.stopHeartBeat();
        if (heartBeatTimerTask != null && heartBeatTimerTask instanceof HeartBeatTimeTask) {
            DumperImpl dumper = ((HeartBeatTimeTask) heartBeatTimerTask).getDumper1();
            try {
                dumper.disconnect();
            } catch (IOException e) {
                log.error("ERROR # disconnect heartbeat dumper for address:{}",
                        dumper.getConnector().getInetSocketAddress(), e);
            }
        }
    }

    // 处理主备切换的逻辑
    public void doSwitch() {
        AuthenticationInfo authenticationInfo = (runningAuthenticationInfo.equals(masterAuthenticationInfo) ? standbyAuthenticationInfo : masterAuthenticationInfo);
        this.doSwitch(authenticationInfo);
    }

    public void doSwitch(AuthenticationInfo authenticationInfo) {
        // 1. 需要停止当前正在复制的过程
        // 2. 找到新的position点
        // 3. 重新建立链接，开始复制数据
        // 切换ip
        String alarmMessage = null;

        if (this.runningAuthenticationInfo.equals(authenticationInfo)) {
            alarmMessage = "same runingInfo switch again : " + runningAuthenticationInfo.getInetSocketAddress().toString();
            log.warn(alarmMessage);
            return;
        }

        if (authenticationInfo == null) {
            alarmMessage = "no standby config, just do nothing, will continue try:"
                    + runningAuthenticationInfo.getInetSocketAddress().toString();
            log.warn(alarmMessage);
            // sendAlarm(taskIdSequense, alarmMessage);
            return;
        } else {
            stop();
            alarmMessage = "try decode ha switch, old:" + runningAuthenticationInfo.getInetSocketAddress().toString() + ", new:"
                    + authenticationInfo.getInetSocketAddress().toString();
            log.warn(alarmMessage);
            // sendAlarm(taskIdSequense, alarmMessage);
            runningAuthenticationInfo = authenticationInfo;
            start();
        }
    }

    private DumperImpl buildDumper(AuthenticationInfo authenticationInfo) {
        DumperImpl dumper = new DumperImpl(//
                authenticationInfo.getInetSocketAddress(),//
                authenticationInfo.getUsername(),//
                authenticationInfo.getPassword(),//
                connectionCharsetNumber,//
                authenticationInfo.getDefaultDatabaseName());//
        dumper.getConnector().setReceiveBufferSize(receiveBufferSize);

        dumper.setCharset(connectionCharset);

        // 随机生成slaveId
        if (this.slaveId <= 0) {
            this.slaveId = generateUniqueSlaveId();
        }
        dumper.setSlaveId(this.slaveId);
        return dumper;
    }

    // =================== helper method =================

    private final long generateUniqueSlaveId() {
        try {
            // a=`echo $masterip|cut -d\. -f1`
            // b=`echo $masterip|cut -d\. -f2`
            // c=`echo $masterip|cut -d\. -f3`
            // d=`echo $masterip|cut -d\. -f4`
            // #server_id=`expr $a \* 256 \* 256 \* 256 + $b \* 256 \* 256 + $c
            // \* 256 + $d `
            // #server_id=$b$c$d
            // server_id=`expr $b \* 256 \* 256 + $c \* 256 + $d `
            InetAddress localHost = InetAddress.getLocalHost();
            byte[] addr = localHost.getAddress();
            int salt = (taskId != null) ? taskId.hashCode() : 0;
            return ((0x7f & salt) << 24) + ((0xff & (int) addr[1]) << 16) // NL
                    + ((0xff & (int) addr[2]) << 8) // NL
                    + (0xff & (int) addr[3]);
        } catch (UnknownHostException e) {
            throw new ParseException("Unknown host", e);
        }
    }

    //find binlogFileOffset
    protected EntryPosition findStartEntryPosition(Dumper dumper) throws IOException {
        if (isGtidMode()) {
            // GTID模式下，CanalLogPositionManager里取最后的gtid，没有则取instanc配置中的
            BinLogEventPosition binLogEventPosition = this.getBinLogEventPositionManager().getLatestBinLogEventPosition(taskId);
            if (binLogEventPosition != null) {
                return binLogEventPosition.getEntryPosition();
            }

            if (masterEntryPosition != null && StringUtils.isNotEmpty(masterEntryPosition.getGtid())) {
                return masterEntryPosition;
            }
        }

        EntryPosition startPosition = findStartPositionInternal(dumper);
        /**
         * 事务binlog binlog_event 解析bug处理
         */
        if (needTransactionPosition.get()) {
            log.warn("prepare decode find last readedIndex : {}", startPosition.toString());
            Long preTransactionStartPosition = findTransactionBeginPosition(dumper, startPosition);
            if (!preTransactionStartPosition.equals(startPosition.getBinlogFileOffset())) {
                log.warn("find new start Transaction Position , old : {} , new : {}", startPosition.getBinlogFileOffset(),
                        preTransactionStartPosition);
                startPosition.setBinlogFileOffset(preTransactionStartPosition);
            }
            needTransactionPosition.compareAndSet(true, false);
        }
        return startPosition;
    }


    protected EntryPosition findEndPositionWithMasterIdAndTimestamp(DumperImpl dumper) {
        DumperImpl dumper1 = (DumperImpl) dumper;
        final EntryPosition endEntryPosition = findEndBinLogPosition(dumper1);
        if (tableMetaDataStore != null) {
            long startTimestamp = System.currentTimeMillis();
            return findEntryPosition(dumper1, endEntryPosition.getBinlogFileName(), endEntryPosition, startTimestamp, true);
        } else {
            return endEntryPosition;
        }
    }

    protected EntryPosition findPositionWithMasterIdAndTimestamp(DumperImpl dumper, EntryPosition fixedEntryPosition) {
        DumperImpl dumper1 = (DumperImpl) dumper;
        if (tableMetaDataStore != null && (fixedEntryPosition.getExecuteTimestamp() == null || fixedEntryPosition.getExecuteTimestamp() <= 0)) {
            // 使用一个未来极大的时间，基于位点进行定位
            long startTimestamp = System.currentTimeMillis() + 1024L * 365 * 24 * 3600 * 1000; // 当前时间的未来1024年
            EntryPosition entryPosition = findEntryPosition(dumper1, fixedEntryPosition.getBinlogFileName(), fixedEntryPosition, startTimestamp, true);
            if (entryPosition == null) {
                throw new ParseException("[fixed executeTimestamp] can't found begin/commit readedIndex before with fixed readedIndex" + fixedEntryPosition.getBinlogFileName() + ":" + fixedEntryPosition.getBinlogFileOffset());
            }
            return entryPosition;
        } else {
            return fixedEntryPosition;
        }
    }

    protected EntryPosition findStartPositionInternal(Dumper dumper) {
        DumperImpl dumper1 = (DumperImpl) dumper;
        BinLogEventPosition binLogEventPosition = binLogEventPositionManager.getLatestBinLogEventPosition(taskId);
        // 找不到历史成功记录
        if (binLogEventPosition == null) {
            EntryPosition entryPosition = null;
            //主机选择
            if (masterAuthenticationInfo != null && dumper1.getConnector().getInetSocketAddress().equals(masterAuthenticationInfo.getInetSocketAddress())) {
                entryPosition = masterEntryPosition;
            } else if (standbyAuthenticationInfo != null && dumper1.getConnector().getInetSocketAddress().equals(standbyAuthenticationInfo.getInetSocketAddress())) {
                entryPosition = standbyEntryPosition;
            }
            //last
            if (entryPosition == null) {
                entryPosition = findEndPositionWithMasterIdAndTimestamp(dumper1); // 默认从当前最后一个位置进行消费
            }

            // 判断一下是否需要按时间订阅
            if (StringUtils.isEmpty(entryPosition.getBinlogFileName())) {
                // 如果没有指定binlogName，尝试按照timestamp进行查找
                if (entryPosition.getExecuteTimestamp() != null && entryPosition.getExecuteTimestamp() > 0L) {
                    log.warn("prepare decode find start readedIndex {}:{}:{}", new Object[]{"", "", entryPosition.getExecuteTimestamp()});
                    return findByStartTimeStamp(dumper1, entryPosition.getExecuteTimestamp());
                } else {
                    log.warn("prepare decode find start readedIndex just show master status");
                    return findEndPositionWithMasterIdAndTimestamp(dumper1); // 默认从当前最后一个位置进行消费
                }
            } else {
                if (entryPosition.getBinlogFileOffset() != null && entryPosition.getBinlogFileOffset() > 0L) {
                    // 如果指定binlogName + offest，直接返回
                    entryPosition = findPositionWithMasterIdAndTimestamp(dumper1, entryPosition);
                    log.warn("prepare decode find start readedIndex {}:{}:{}", new Object[]{entryPosition.getBinlogFileName(), entryPosition.getBinlogFileOffset(), entryPosition.getExecuteTimestamp()});
                    return entryPosition;
                } else {
                    EntryPosition specificLogFilePosition = null;
                    if (entryPosition.getExecuteTimestamp() != null && entryPosition.getExecuteTimestamp() > 0L) {
                        // 如果指定binlogName + executeTimestamp，但没有指定对应的offest，尝试根据时间找一下offest
                        EntryPosition endPosition = findEndBinLogPosition(dumper1);
                        if (endPosition != null) {
                            log.warn("prepare decode find start readedIndex {}:{}:{}", new Object[]{entryPosition.getBinlogFileName(), "", entryPosition.getExecuteTimestamp()});
                            specificLogFilePosition = findEntryPosition(dumper1, entryPosition.getBinlogFileName(), endPosition, entryPosition.getExecuteTimestamp(), true);
                        }
                    }

                    if (specificLogFilePosition == null) {
                        // position不存在，从文件头开始
                        entryPosition.setBinlogFileOffset(BINLOG_START_OFFEST);
                        return entryPosition;
                    } else {
                        return specificLogFilePosition;
                    }
                }
            }
        } else {
            if (binLogEventPosition.getSlaveId().getServerInetSocketAddress().equals(dumper1.getConnector().getInetSocketAddress())) {
                if (dumpErrorCountThreshold >= 0 && dumpErrorCount > dumpErrorCountThreshold) {
                    // binlog定位位点失败,可能有两个原因:
                    // 1. binlog位点被删除
                    // 2.vip模式的mysql,发生了主备切换,判断一下serverId是否变化,针对这种模式可以发起一次基于时间戳查找合适的binlog位点
                    boolean case2 = (standbyAuthenticationInfo == null || standbyAuthenticationInfo.getInetSocketAddress() == null)//
                            && binLogEventPosition.getEntryPosition().getServerId() != null//
                            && !binLogEventPosition.getEntryPosition().getServerId().equals(findServerId(dumper1));//
                    if (case2) {
                        long timestamp = binLogEventPosition.getEntryPosition().getExecuteTimestamp();
                        long newStartTimestamp = timestamp - fallbackIntervalInSeconds * 1000;
                        log.warn("prepare decode find start readedIndex by last readedIndex {}:{}:{}",
                                new Object[]{"", "", binLogEventPosition.getEntryPosition().getExecuteTimestamp()});
                        EntryPosition findPosition = findByStartTimeStamp(dumper1, newStartTimestamp);
                        // 重新置为一下
                        dumpErrorCount = 0;
                        return findPosition;
                    }

                    Long timestamp = binLogEventPosition.getEntryPosition().getExecuteTimestamp();
                    if (isRdsOssMode() && (timestamp != null && timestamp > 0)) {
                        // 如果binlog位点不存在，并且属于timestamp不为空,可以返回null走到oss binlog处理
                        return null;
                    }
                }
                // 其余情况
                log.warn("prepare decode find start readedIndex just last readedIndex\n {}", JsonUtils.marshalToString(binLogEventPosition));
                return binLogEventPosition.getEntryPosition();
            } else {
                // 针对切换的情况，考虑回退时间
                long newStartTimestamp = binLogEventPosition.getEntryPosition().getExecuteTimestamp() - fallbackIntervalInSeconds * 1000;
                log.warn("prepare decode find start readedIndex by switch {}:{}:{}", new Object[]{"", "", binLogEventPosition.getEntryPosition().getExecuteTimestamp()});
                return findByStartTimeStamp(dumper1, newStartTimestamp);
            }
        }
    }

    // 根据想要的position，可能这个position对应的记录为rowdata，需要找到事务头，避免丢数据
    // 主要考虑一个事务执行时间可能会几秒种，如果仅仅按照timestamp相同，则可能会丢失事务的前半部分数据
    private Long findTransactionBeginPosition(Dumper dumper, final EntryPosition entryPosition)
            throws IOException {
        // 针对开始的第一条为非Begin记录，需要从该binlog扫描
        final java.util.concurrent.atomic.AtomicLong preTransactionStartPosition = new java.util.concurrent.atomic.AtomicLong(0L);
        dumper.reconnect();
        dumper.seek(entryPosition.getBinlogFileName(), 4L, entryPosition.getGtid(),
                new Sink<BinLogEvent>() {

                    private BinLogEventPosition lastBinLogEventPosition;

                    public boolean sink(BinLogEvent binLogEvent) {
                        try {
                            Entry entry = binLogEventParser.parse(binLogEvent, true);// parseAndProfilingIfNecessary(binlog_event, true);
                            if (entry == null) {
                                return true;
                            }

                            // 直接查询第一条业务数据，确认是否为事务Begin
                            // 记录一下transaction begin readedIndex
                            if (entry.getEntryType() == EntryType.transaction_begin
                                    && entry.getEntryHeader().getBinlogFileOffset() < entryPosition.getBinlogFileOffset()) {
                                preTransactionStartPosition.set(entry.getEntryHeader().getBinlogFileOffset());
                            }

                            if (entry.getEntryHeader().getBinlogFileOffset() >= entryPosition.getBinlogFileOffset()) {
                                return false;// 退出
                            }

                            lastBinLogEventPosition = buildLastTransactionEndBinLogEventPosition(entry);
                        } catch (Exception e) {
                            processSinkError(e, lastBinLogEventPosition, entryPosition.getBinlogFileName(),
                                    entryPosition.getBinlogFileOffset());
                            return false;
                        }

                        return running;
                    }
                });

        // 判断一下找到的最接近position的事务头的位置
        if (preTransactionStartPosition.get() > entryPosition.getBinlogFileOffset()) {
            log.error("preTransactionEndPosition greater than startPosition from zk or localconf, maybe lost data");
            throw new ParseException(
                    "preTransactionStartPosition greater than startPosition from zk or localconf, maybe lost data");
        }
        return preTransactionStartPosition.get();
    }

    /**
     * 根据时间查找binlog位置
     */
    private EntryPosition findByStartTimeStamp(DumperImpl dumper, Long startTimestamp) {

        EntryPosition startEntryPosition = findStartBinLogPosition(dumper);
        String minBinlogFileName = startEntryPosition.getBinlogFileName();
        //
        EntryPosition endEntryPosition = findEndBinLogPosition(dumper);
        String maxBinlogFileName = endEntryPosition.getBinlogFileName();
        log.info("show master status decode set search end pass:{} ", endEntryPosition);
        String startSearchBinlogFile = endEntryPosition.getBinlogFileName();
        //
        boolean shouldBreak = false;
        while (running && !shouldBreak) {
            try {
                EntryPosition entryPosition = findEntryPosition(dumper, startSearchBinlogFile, endEntryPosition, startTimestamp, false);
                if (entryPosition == null) {
                    if (StringUtils.equalsIgnoreCase(minBinlogFileName, startSearchBinlogFile)) {
                        // 已经找到最早的一个binlog，没必要往前找了
                        shouldBreak = true;
                        log.warn("Didn't find the corresponding binlog_event files from {} decode {}", minBinlogFileName, maxBinlogFileName);
                    } else {
                        // 继续往前找
                        int binlogSeqNum = Integer.parseInt(startSearchBinlogFile.substring(startSearchBinlogFile.indexOf(".") + 1));
                        if (binlogSeqNum <= 1) {
                            log.warn("Didn't find the corresponding binlog_event files");
                            shouldBreak = true;
                        } else {
                            int nextBinlogSeqNum = binlogSeqNum - 1;
                            String binlogFileNamePrefix = startSearchBinlogFile.substring(0, startSearchBinlogFile.indexOf(".") + 1);
                            String binlogFileNameSuffix = String.format("%06d", nextBinlogSeqNum);
                            startSearchBinlogFile = binlogFileNamePrefix + binlogFileNameSuffix;
                        }
                    }
                } else {
                    log.info("found and return:{} in findByStartTimeStamp operation.", entryPosition);
                    return entryPosition;
                }
            } catch (Exception e) {
                log.warn(String.format(
                        "the binlogfile:%s doesn't exist, decode continue decode search the next binlogfile , caused by",
                        startSearchBinlogFile), e);
                int binlogSeqNum = Integer.parseInt(startSearchBinlogFile.substring(startSearchBinlogFile.indexOf(".") + 1));
                if (binlogSeqNum <= 1) {
                    log.warn("Didn't find the corresponding binlog_event files");
                    shouldBreak = true;
                } else {
                    int nextBinlogSeqNum = binlogSeqNum - 1;
                    String binlogFileNamePrefix = startSearchBinlogFile.substring(0, startSearchBinlogFile.indexOf(".") + 1);
                    String binlogFileNameSuffix = String.format("%06d", nextBinlogSeqNum);
                    startSearchBinlogFile = binlogFileNamePrefix + binlogFileNameSuffix;
                }
            }
        }
        // 找不到
        return null;
    }

    /**
     * 查询当前db的serverId信息
     */
    private Long findServerId(DumperImpl dumper) {
        try {
            ResultSetPacket packet = dumper.query("show variables like 'server_id'");
            List<String> valueList = packet.getValueList();
            if (CollectionUtils.isEmpty(valueList)) {
                throw new ParseException(
                        "command : show variables like 'server_id' has an error! pls check. you need (at least one of) the SUPER,REPLICATION CLIENT privilege(s) for this operation");
            }
            return Long.valueOf(valueList.get(1));
        } catch (IOException e) {
            throw new ParseException("command : show variables like 'server_id' has an error!", e);
        }
    }

    /**
     * 查询当前的binlog位置
     */
    private EntryPosition findStartBinLogPosition(DumperImpl dumper) {
        try {
            ResultSetPacket resultSetPacket = dumper.query("show binlog events limit 1");
            List<String> valueList = resultSetPacket.getValueList();
            if (CollectionUtils.isEmpty(valueList)) {
                throw new ParseException(
                        "command : 'show binlog_event events newLimit 1' has an error! pls check. you need (at least one of) the SUPER,REPLICATION CLIENT privilege(s) for this operation");
            }
            EntryPosition entryPosition = new EntryPosition(valueList.get(0), Long.valueOf(valueList.get(1)));
            return entryPosition;
        } catch (IOException e) {
            throw new ParseException("command : 'show binlog_event events newLimit 1' has an error!", e);
        }

    }

    /**
     * 查询最新的binlog位置
     */
    private EntryPosition findEndBinLogPosition(DumperImpl dumper) {
        try {
            ResultSetPacket resultSetPacket = dumper.query("show master status");
            List<String> valueList = resultSetPacket.getValueList();
            if (CollectionUtils.isEmpty(valueList)) {
                throw new ParseException(
                        "command : 'show master status' has an error! pls check. you need (at least one of) the SUPER,REPLICATION CLIENT privilege(s) for this operation");
            }
            //String binlogEventFileName, Long binlogFileOffset
            EntryPosition entryPosition = new EntryPosition(valueList.get(0), Long.valueOf(valueList.get(1)));
            if (isGtidMode() && valueList.size() > 4) {
                entryPosition.setGtid(valueList.get(4));
            }
            return entryPosition;
        } catch (IOException e) {
            throw new ParseException("command : 'show master status' has an error!", e);
        }
    }


    /**
     * 根据给定的时间戳，在指定的binlog中找到最接近小于该时间戳的一个事务起始位置。 对最后一个binlog会给定endPosition，避免无尽的查询
     */
    private EntryPosition findEntryPosition(//
                                            DumperImpl dumper,//
                                            final String searchBinlogFile, //
                                            final EntryPosition endPosition, //
                                            final Long startTimestamp, final Boolean justForPositionTimestamp//
                                            //
                                            //
    ) {//

        final BinLogEventPosition binLogEventPosition = new BinLogEventPosition();
        try {
            dumper.reconnect();
            // 开始遍历文件
            dumper.seek(searchBinlogFile, 4L, endPosition.getGtid(), new Sink<BinLogEvent>() {

                private BinLogEventPosition lastBinLogEventPosition;

                public boolean sink(BinLogEvent binLogEvent) {
                    EntryPosition entryPosition = null;
                    try {
                        Entry entry = binLogEventParser.parse(binLogEvent, true);// parseAndProfilingIfNecessary(binLogEvent, true);
                        if (justForPositionTimestamp && binLogEventPosition.getEntryPosition() == null && binLogEvent.getExecuteTimeStamp() > 0) {
                            // 初始位点
                            entryPosition = new EntryPosition(//
                                    binLogEvent.getServerId(), searchBinlogFile,//
                                    binLogEvent.getNextBinLogEventPosition() - binLogEvent.getEventLength(),//
                                    binLogEvent.getExecuteTimeStamp() * 1000 //
                            );//
                            entryPosition.setGtid(binLogEvent.getHeader().getGtidSetString());
                            binLogEventPosition.setEntryPosition(entryPosition);
                        }

                        // 直接用event的位点来处理,解决一个binlog文件里没有任何事件导致死循环无法退出的问题
                        String logFileName = binLogEvent.getHeader().getLogFileName();
                        // 记录的是binlog end offest,
                        // 因为与其对比的offest是show master status里的end offest
                        Long logFileOffset = binLogEvent.getHeader().getNextBinLogEventPosition();
                        Long logFileTimestamp = binLogEvent.getHeader().getExecuteTimeStamp() * 1000;
                        Long serverId = binLogEvent.getHeader().getServerId();

                        // 需要寻找-比其更小的则ok
                        //
                        // 如果最小的一条记录都不满足条件，可直接退出
                        if (logFileTimestamp >= startTimestamp) {
                            return false;
                        }
                        if (StringUtils.equals(endPosition.getBinlogFileName(), logFileName) && logFileOffset >= endPosition.getBinlogFileOffset()) {
                            return false;
                        }
                        if (entry == null) {
                            return true;
                        }

                        // 记录一下上一个事务结束的位置，即下一个事务的position readedIndex = current + data.length，代表该事务的下一条offest，避免多余的事务重复
                        if (EntryType.transaction_end.equals(entry.getEntryType())) {
                            entryPosition = new EntryPosition(serverId, logFileName, logFileOffset, logFileTimestamp);
                            if (log.isDebugEnabled()) {
                                log.debug(
                                        "set {} decode be pending start readedIndex before finding another proper one...",
                                        entryPosition);
                            }
                            binLogEventPosition.setEntryPosition(entryPosition);
                            entryPosition.setGtid(entry.getEntryHeader().getGtid());
                        } else if (EntryType.transaction_begin.equals(entry.getEntryType())) {
                            // 当前事务开始位点
                            entryPosition = new EntryPosition(serverId, logFileName, logFileOffset, logFileTimestamp);
                            if (log.isDebugEnabled()) {
                                log.debug(
                                        "set {} decode be pending start readedIndex before finding another proper one...",
                                        entryPosition);
                            }
                            entryPosition.setGtid(entry.getEntryHeader().getGtid());
                            binLogEventPosition.setEntryPosition(entryPosition);
                        } else {
                            //else
                            //binLogEventPosition.getEntryPosition()==null or !=null
                        }

                        lastBinLogEventPosition = buildLastTransactionEndBinLogEventPosition(entry);
                    } catch (Throwable e) {
                        processSinkError(e, lastBinLogEventPosition, searchBinlogFile, 4L);
                    }

                    return running;
                }
            });

        } catch (IOException e) {
            log.error("ERROR ## findEntryPosition has an error", e);
        }

        if (binLogEventPosition.getEntryPosition() != null) {
            return binLogEventPosition.getEntryPosition();
        } else {
            return null;
        }
    }

    @Override
    protected void processDumpError(Throwable e) {
        if (e instanceof IOException) {
            String message = e.getMessage();
            if (StringUtils.contains(message, "errno = 1236")) {
                // 1236 errorCode代表ER_MASTER_FATAL_ERROR_READING_BINLOG
                dumpErrorCount++;
            }
        }

        super.processDumpError(e);
    }


}
